# LocalStack Resource Provider Scaffolding v2
from __future__ import annotations

from pathlib import Path
from typing import TypedDict

import localstack.services.cloudformation.provider_utils as util
from localstack.services.cloudformation.resource_provider import (
    OperationStatus,
    ProgressEvent,
    ResourceProvider,
    ResourceRequest,
)


class SchedulerScheduleProperties(TypedDict):
    FlexibleTimeWindow: FlexibleTimeWindow | None
    ScheduleExpression: str | None
    Target: Target | None
    Arn: str | None
    Description: str | None
    EndDate: str | None
    GroupName: str | None
    KmsKeyArn: str | None
    Name: str | None
    ScheduleExpressionTimezone: str | None
    StartDate: str | None
    State: str | None


class FlexibleTimeWindow(TypedDict):
    Mode: str | None
    MaximumWindowInMinutes: float | None


class DeadLetterConfig(TypedDict):
    Arn: str | None


class RetryPolicy(TypedDict):
    MaximumEventAgeInSeconds: float | None
    MaximumRetryAttempts: float | None


class AwsVpcConfiguration(TypedDict):
    Subnets: list[str] | None
    AssignPublicIp: str | None
    SecurityGroups: list[str] | None


class NetworkConfiguration(TypedDict):
    AwsvpcConfiguration: AwsVpcConfiguration | None


class CapacityProviderStrategyItem(TypedDict):
    CapacityProvider: str | None
    Base: float | None
    Weight: float | None


class PlacementConstraint(TypedDict):
    Expression: str | None
    Type: str | None


class PlacementStrategy(TypedDict):
    Field: str | None
    Type: str | None


class EcsParameters(TypedDict):
    TaskDefinitionArn: str | None
    CapacityProviderStrategy: list[CapacityProviderStrategyItem] | None
    EnableECSManagedTags: bool | None
    EnableExecuteCommand: bool | None
    Group: str | None
    LaunchType: str | None
    NetworkConfiguration: NetworkConfiguration | None
    PlacementConstraints: list[PlacementConstraint] | None
    PlacementStrategy: list[PlacementStrategy] | None
    PlatformVersion: str | None
    PropagateTags: str | None
    ReferenceId: str | None
    Tags: list[dict] | None
    TaskCount: float | None


class EventBridgeParameters(TypedDict):
    DetailType: str | None
    Source: str | None


class KinesisParameters(TypedDict):
    PartitionKey: str | None


class SageMakerPipelineParameter(TypedDict):
    Name: str | None
    Value: str | None


class SageMakerPipelineParameters(TypedDict):
    PipelineParameterList: list[SageMakerPipelineParameter] | None


class SqsParameters(TypedDict):
    MessageGroupId: str | None


class Target(TypedDict):
    Arn: str | None
    RoleArn: str | None
    DeadLetterConfig: DeadLetterConfig | None
    EcsParameters: EcsParameters | None
    EventBridgeParameters: EventBridgeParameters | None
    Input: str | None
    KinesisParameters: KinesisParameters | None
    RetryPolicy: RetryPolicy | None
    SageMakerPipelineParameters: SageMakerPipelineParameters | None
    SqsParameters: SqsParameters | None


REPEATED_INVOCATION = "repeated_invocation"


class SchedulerScheduleProvider(ResourceProvider[SchedulerScheduleProperties]):
    TYPE = "AWS::Scheduler::Schedule"  # Autogenerated. Don't change
    SCHEMA = util.get_schema_path(Path(__file__))  # Autogenerated. Don't change

    def create(
        self,
        request: ResourceRequest[SchedulerScheduleProperties],
    ) -> ProgressEvent[SchedulerScheduleProperties]:
        """
        Create a new resource.

        Primary identifier fields:
          - /properties/Name

        Required properties:
          - FlexibleTimeWindow
          - ScheduleExpression
          - Target

        Create-only properties:
          - /properties/Name

        Read-only properties:
          - /properties/Arn

        IAM permissions required:
          - scheduler:CreateSchedule
          - scheduler:GetSchedule
          - iam:PassRole

        """
        model = request.desired_state

        if not model.get("Name"):
            model["Name"] = util.generate_default_name(
                request.stack_name, request.logical_resource_id
            )

        create_params = util.select_attributes(
            model,
            [
                "Description",
                "EndDate",
                "FlexibleTimeWindow",
                "GroupName",
                "KmsKeyArn",
                "Name",
                "ScheduleExpression",
                "ScheduleExpressionTimezone",
                "StartDate",
                "State",
                "Target",
            ],
        )

        result = request.aws_client_factory.scheduler.create_schedule(**create_params)
        model["Arn"] = result["ScheduleArn"]

        return ProgressEvent(
            status=OperationStatus.SUCCESS,
            resource_model=model,
        )

    def read(
        self,
        request: ResourceRequest[SchedulerScheduleProperties],
    ) -> ProgressEvent[SchedulerScheduleProperties]:
        """
        Fetch resource information

        IAM permissions required:
          - scheduler:GetSchedule
        """
        raise NotImplementedError

    def delete(
        self,
        request: ResourceRequest[SchedulerScheduleProperties],
    ) -> ProgressEvent[SchedulerScheduleProperties]:
        """
        Delete a resource

        IAM permissions required:
          - scheduler:DeleteSchedule
          - scheduler:GetSchedule
        """

        delete_params = util.select_attributes(request.desired_state, ["Name", "GroupName"])
        request.aws_client_factory.scheduler.delete_schedule(**delete_params)
        return ProgressEvent(status=OperationStatus.SUCCESS, resource_model={})

    def update(
        self,
        request: ResourceRequest[SchedulerScheduleProperties],
    ) -> ProgressEvent[SchedulerScheduleProperties]:
        """
        Update a resource

        IAM permissions required:
          - scheduler:UpdateSchedule
          - scheduler:GetSchedule
          - iam:PassRole
        """
        raise NotImplementedError
